Skip to main content

Source: [ebisu/docs/reference/schemas/drizzle schemas/crunchybridge_implementation_guide_reference_data.md](https://github.com/goldfish-inc/ebisu/blob/main/docs/reference/schemas/drizzle schemas/crunchybridge_implementation_guide_reference_data.md) | [✏️ Edit on GitHub](https://github.com/goldfish-inc/ebisu/edit/main/docs/reference/schemas/drizzle schemas/crunchybridge_implementation_guide_reference_data.md)

CrunchyBridge Implementation Guide for Reference Data

🎯 Overview

This guide walks through migrating your enhanced reference data system from local PostgreSQL to CrunchyBridge, including schema deployment, data migration, and historical change tracking setup.


📋 Prerequisites

1. CrunchyBridge Account Setup

# Install CrunchyBridge CLI
npm install -g @crunchydata/bridge-cli

# Login to your account
bridge login

2. Project Dependencies

# Core dependencies
npm install drizzle-orm drizzle-kit pg @types/pg
npm install -D tsx dotenv

# Optional: Database management tools
npm install postgres-migrations

3. Environment Configuration

# .env.production
DATABASE_URL="postgres://username:password@cluster-id.aws-us-east-1.crunchydata.com:5432/dbname?sslmode=require"
CRUNCHY_CLUSTER_ID="your-cluster-id"
CRUNCHY_DATABASE_NAME="your-database-name"

# Development/Local (for comparison)
DATABASE_URL_LOCAL="postgres://user:pass@localhost:5432/your_local_db"

🚀 Phase 1: CrunchyBridge Cluster Setup

1. Create Cluster

# Create a new cluster
bridge clusters create \
--name reference-data-prod \
--plan hobby-2 \
--region us-east-1

# Get connection details
bridge clusters show reference-data-prod

2. Database Configuration

-- Connect to your cluster and run initial setup
-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pg_trgm";
CREATE EXTENSION IF NOT EXISTS "btree_gin";

-- Create application user (optional but recommended)
CREATE USER app_user WITH PASSWORD 'secure_password_here';
GRANT CONNECT ON DATABASE your_db_name TO app_user;

🔧 Phase 2: Drizzle Schema Deployment

1. Project Structure

src/
├── db/
│ ├── schema/
│ │ ├── reference-data.ts # Your Drizzle schema
│ │ └── index.ts
│ ├── migrations/
│ ├── migrate.ts
│ └── connection.ts
├── scripts/
│ ├── migrate-data.ts
│ ├── verify-migration.ts
│ └── setup-historical-tracking.ts
└── drizzle.config.ts

2. Drizzle Configuration

// drizzle.config.ts
import type { Config } from 'drizzle-kit';
import { config } from 'dotenv';

config({ path: '.env.production' });

export default {
schema: './src/db/schema/*.ts',
out: './src/db/migrations',
driver: 'pg',
dbCredentials: {
connectionString: process.env.DATABASE_URL!,
ssl: true,
},
verbose: true,
strict: true,
} satisfies Config;

3. Database Connection

// src/db/connection.ts
import { drizzle } from 'drizzle-orm/postgres-js';
import postgres from 'postgres';
import * as schema from './schema';

const connectionString = process.env.DATABASE_URL!;

// Configure for CrunchyBridge
const sql = postgres(connectionString, {
ssl: 'require',
max: 10,
idle_timeout: 20,
connect_timeout: 30,
});

export const db = drizzle(sql, { schema });
export { sql };

4. Generate and Apply Migrations

# Generate migration files
npx drizzle-kit generate:pg

# Review generated migrations in src/db/migrations/

# Apply migrations to CrunchyBridge
npx drizzle-kit push:pg

📊 Phase 3: Data Migration

1. Export Data from Local PostgreSQL

// scripts/export-reference-data.ts
import { sql as localSql } from '../src/db/connection-local';
import fs from 'fs/promises';

const exportTables = [
'original_sources',
'country_iso',
'fao_major_areas',
'gear_types_fao',
'gear_types_cbp',
'vessel_types',
'vessel_hull_material',
'rfmos',
'gear_types_relationship_fao_cbp',
'country_iso_eu'
];

async function exportReferenceData() {
const exportData: Record<string, any[]> = {};

for (const table of exportTables) {
console.log(`📤 Exporting ${table}...`);

const result = await localSql.unsafe(`
SELECT * FROM ${table}
ORDER BY created_at ASC
`);

exportData[table] = result;
console.log(`✅ Exported ${result.length} records from ${table}`);
}

// Save to JSON file with timestamp
const timestamp = new Date().toISOString().split('T')[0];
const filename = `reference-data-export-${timestamp}.json`;

await fs.writeFile(filename, JSON.stringify(exportData, null, 2));
console.log(`💾 Export completed: ${filename}`);

return exportData;
}

exportReferenceData().catch(console.error);

2. Import Data to CrunchyBridge

// scripts/import-reference-data.ts
import { db } from '../src/db/connection';
import * as schema from '../src/db/schema';
import fs from 'fs/promises';

async function importReferenceData(filename: string) {
console.log(`📥 Loading data from ${filename}...`);

const data = JSON.parse(await fs.readFile(filename, 'utf-8'));

// Import in dependency order
const importOrder = [
{ table: 'original_sources', schema: schema.originalSources },
{ table: 'country_iso', schema: schema.countryIso },
{ table: 'fao_major_areas', schema: schema.faoMajorAreas },
{ table: 'gear_types_fao', schema: schema.gearTypesFao },
{ table: 'gear_types_cbp', schema: schema.gearTypesCbp },
{ table: 'vessel_types', schema: schema.vesselTypes },
{ table: 'vessel_hull_material', schema: schema.vesselHullMaterial },
{ table: 'rfmos', schema: schema.rfmos },
{ table: 'gear_types_relationship_fao_cbp', schema: schema.gearTypesRelationshipFaoCbp },
{ table: 'country_iso_eu', schema: schema.countryIsoEu },
];

for (const { table, schema: tableSchema } of importOrder) {
const records = data[table];
if (!records || records.length === 0) {
console.log(`⚠️ No data found for ${table}`);
continue;
}

console.log(`📤 Importing ${records.length} records to ${table}...`);

// Import in batches of 100
const batchSize = 100;
for (let i = 0; i < records.length; i += batchSize) {
const batch = records.slice(i, i + batchSize);
await db.insert(tableSchema).values(batch);
}

console.log(`✅ Successfully imported ${records.length} records to ${table}`);
}

console.log('🎉 Data import completed!');
}

// Run the import
const exportFile = process.argv[2] || `reference-data-export-${new Date().toISOString().split('T')[0]}.json`;
importReferenceData(exportFile).catch(console.error);

3. Verification Script

// scripts/verify-migration.ts
import { db } from '../src/db/connection';
import * as schema from '../src/db/schema';
import { count, eq } from 'drizzle-orm';

async function verifyMigration() {
console.log('🔍 Verifying migration...\n');

const tables = [
{ name: 'original_sources', schema: schema.originalSources },
{ name: 'country_iso', schema: schema.countryIso },
{ name: 'fao_major_areas', schema: schema.faoMajorAreas },
{ name: 'gear_types_fao', schema: schema.gearTypesFao },
{ name: 'gear_types_cbp', schema: schema.gearTypesCbp },
{ name: 'vessel_types', schema: schema.vesselTypes },
{ name: 'vessel_hull_material', schema: schema.vesselHullMaterial },
{ name: 'rfmos', schema: schema.rfmos },
{ name: 'gear_types_relationship_fao_cbp', schema: schema.gearTypesRelationshipFaoCbp },
{ name: 'country_iso_eu', schema: schema.countryIsoEu },
];

let totalRecords = 0;

for (const { name, schema: tableSchema } of tables) {
const [result] = await db.select({ count: count() }).from(tableSchema);
console.log(`📊 ${name}: ${result.count} records`);
totalRecords += result.count;
}

console.log(`\n📈 Total records: ${totalRecords}`);

// Test source tracking
console.log('\n🔗 Testing source relationships...');

const sourcesWithCounts = await db
.select({
shortname: schema.originalSources.sourceShortname,
status: schema.originalSources.status,
sizeApprox: schema.originalSources.sizeApprox,
refreshDate: schema.originalSources.refreshDate,
})
.from(schema.originalSources)
.where(eq(schema.originalSources.status, 'LOADED'));

sourcesWithCounts.forEach(source => {
console.log(`${source.shortname}: ${source.status} (${source.sizeApprox} records, refreshed: ${source.refreshDate})`);
});

// Test multi-type queries
console.log('\n🔍 Testing multi-type source queries...');

const gearSources = await db
.select({ shortname: schema.originalSources.sourceShortname })
.from(schema.originalSources)
.where(sql`'GEAR_DATA' = ANY(${schema.originalSources.sourceTypes})`);

console.log(`🔧 Sources with GEAR_DATA: ${gearSources.map(s => s.shortname).join(', ')}`);

console.log('\n🎉 Migration verification completed!');
}

verifyMigration().catch(console.error);

📈 Phase 4: Historical Change Tracking Setup

1. Historical Tables Schema

// src/db/schema/historical-tracking.ts
import { pgTable, uuid, text, timestamp, jsonb, index } from 'drizzle-orm/pg-core';

export const historicalChanges = pgTable('historical_changes', {
id: uuid('id').primaryKey().defaultRandom(),
tableName: text('table_name').notNull(),
recordId: uuid('record_id').notNull(),
operation: text('operation').notNull(), // 'INSERT', 'UPDATE', 'DELETE'
oldValues: jsonb('old_values'),
newValues: jsonb('new_values'),
changedBy: text('changed_by'),
changedAt: timestamp('changed_at').defaultNow(),
changeReason: text('change_reason'),
}, (table) => ({
tableNameIdx: index('idx_historical_table_name').on(table.tableName),
recordIdIdx: index('idx_historical_record_id').on(table.recordId),
changedAtIdx: index('idx_historical_changed_at').on(table.changedAt),
tableRecordIdx: index('idx_historical_table_record').on(table.tableName, table.recordId),
}));

export type HistoricalChange = typeof historicalChanges.$inferSelect;
export type NewHistoricalChange = typeof historicalChanges.$inferInsert;

2. Trigger Functions Setup

-- scripts/setup-historical-triggers.sql
-- Generic function to track changes
CREATE OR REPLACE FUNCTION track_historical_changes()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'DELETE' THEN
INSERT INTO historical_changes (table_name, record_id, operation, old_values, changed_at)
VALUES (TG_TABLE_NAME, OLD.id, 'DELETE', to_jsonb(OLD), NOW());
RETURN OLD;
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO historical_changes (table_name, record_id, operation, old_values, new_values, changed_at)
VALUES (TG_TABLE_NAME, NEW.id, 'UPDATE', to_jsonb(OLD), to_jsonb(NEW), NOW());
RETURN NEW;
ELSIF TG_OP = 'INSERT' THEN
INSERT INTO historical_changes (table_name, record_id, operation, new_values, changed_at)
VALUES (TG_TABLE_NAME, NEW.id, 'INSERT', to_jsonb(NEW), NOW());
RETURN NEW;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;

-- Create triggers for each table you want to track
CREATE TRIGGER country_iso_history_trigger
AFTER INSERT OR UPDATE OR DELETE ON country_iso
FOR EACH ROW EXECUTE FUNCTION track_historical_changes();

CREATE TRIGGER fao_major_areas_history_trigger
AFTER INSERT OR UPDATE OR DELETE ON fao_major_areas
FOR EACH ROW EXECUTE FUNCTION track_historical_changes();

-- Add more triggers as needed...

3. Historical Tracking Service

// src/services/historical-tracking.ts
import { db } from '../db/connection';
import { historicalChanges } from '../db/schema/historical-tracking';
import { gte, lte, eq, and } from 'drizzle-orm';

export class HistoricalTrackingService {

async getTableHistory(tableName: string, recordId?: string, days: number = 30) {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - days);

const whereConditions = [
eq(historicalChanges.tableName, tableName),
gte(historicalChanges.changedAt, cutoffDate)
];

if (recordId) {
whereConditions.push(eq(historicalChanges.recordId, recordId));
}

return await db
.select()
.from(historicalChanges)
.where(and(...whereConditions))
.orderBy(historicalChanges.changedAt);
}

async getChangesByDateRange(startDate: Date, endDate: Date) {
return await db
.select()
.from(historicalChanges)
.where(and(
gte(historicalChanges.changedAt, startDate),
lte(historicalChanges.changedAt, endDate)
))
.orderBy(historicalChanges.changedAt);
}

async cleanupOldChanges(retentionDays: number = 365) {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - retentionDays);

const result = await db
.delete(historicalChanges)
.where(lte(historicalChanges.changedAt, cutoffDate));

console.log(`🧹 Cleaned up historical changes older than ${retentionDays} days`);
return result;
}
}

🔧 Phase 5: Production Setup

1. Connection Pooling

// src/db/connection-pool.ts
import { drizzle } from 'drizzle-orm/postgres-js';
import postgres from 'postgres';
import * as schema from './schema';

// Production connection with proper pooling
const sql = postgres(process.env.DATABASE_URL!, {
ssl: 'require',
max: 20, // Maximum connections
idle_timeout: 20, // Close idle connections after 20s
connect_timeout: 30, // Connection timeout
prepare: false, // Disable prepared statements for connection pooling
});

export const db = drizzle(sql, {
schema,
logger: process.env.NODE_ENV === 'development'
});

// Graceful shutdown
process.on('SIGINT', async () => {
console.log('🔌 Closing database connections...');
await sql.end();
process.exit(0);
});

2. Environment-Specific Configuration

// src/config/database.ts
interface DatabaseConfig {
connectionString: string;
ssl: boolean;
maxConnections: number;
idleTimeout: number;
connectTimeout: number;
}

const configs: Record<string, DatabaseConfig> = {
development: {
connectionString: process.env.DATABASE_URL_LOCAL!,
ssl: false,
maxConnections: 5,
idleTimeout: 20,
connectTimeout: 10,
},
production: {
connectionString: process.env.DATABASE_URL!,
ssl: true,
maxConnections: 20,
idleTimeout: 20,
connectTimeout: 30,
},
};

export const dbConfig = configs[process.env.NODE_ENV || 'development'];

3. Health Check Endpoint

// src/health/database.ts
import { db } from '../db/connection';
import { originalSources } from '../db/schema';
import { count } from 'drizzle-orm';

export async function checkDatabaseHealth() {
try {
// Test basic connectivity
const [result] = await db.select({ count: count() }).from(originalSources);

return {
status: 'healthy',
timestamp: new Date().toISOString(),
recordCount: result.count,
latency: 'measured_in_app', // Add actual latency measurement
};
} catch (error) {
return {
status: 'unhealthy',
timestamp: new Date().toISOString(),
error: error instanceof Error ? error.message : 'Unknown error',
};
}
}

📊 Phase 6: Monitoring & Maintenance

1. Monitoring Script

// scripts/monitor-reference-data.ts
import { db } from '../src/db/connection';
import * as schema from '../src/db/schema';
import { count, eq, lt } from 'drizzle-orm';

async function monitorReferenceData() {
console.log('📊 Reference Data Health Check');
console.log('==============================\n');

// Check source statuses
const sources = await db
.select({
shortname: schema.originalSources.sourceShortname,
status: schema.originalSources.status,
refreshDate: schema.originalSources.refreshDate,
sizeApprox: schema.originalSources.sizeApprox,
})
.from(schema.originalSources);

const pendingSources = sources.filter(s => s.status === 'PENDING');
const failedSources = sources.filter(s => s.status === 'FAILED');
const staleSources = sources.filter(s => {
if (!s.refreshDate) return true;
const daysSinceRefresh = (Date.now() - s.refreshDate.getTime()) / (1000 * 60 * 60 * 24);
return daysSinceRefresh > 90; // 3 months
});

console.log(`✅ Total sources: ${sources.length}`);
console.log(`⏳ Pending sources: ${pendingSources.length}`);
console.log(`❌ Failed sources: ${failedSources.length}`);
console.log(`⚠️ Stale sources (>90 days): ${staleSources.length}\n`);

if (failedSources.length > 0) {
console.log('❌ Failed Sources:');
failedSources.forEach(s => console.log(` - ${s.shortname}`));
console.log();
}

if (staleSources.length > 0) {
console.log('⚠️ Stale Sources:');
staleSources.forEach(s => console.log(` - ${s.shortname} (last refresh: ${s.refreshDate})`));
console.log();
}

// Check data integrity
const [countriesCount] = await db.select({ count: count() }).from(schema.countryIso);
const [gearRelCount] = await db.select({ count: count() }).from(schema.gearTypesRelationshipFaoCbp);

console.log('📈 Data Integrity:');
console.log(` Countries: ${countriesCount.count} (expected: ~249)`);
console.log(` Gear relationships: ${gearRelCount.count} (expected: ~69)`);

console.log('\n🎉 Health check completed!');
}

// Schedule this to run daily
monitorReferenceData().catch(console.error);

2. Backup Strategy

#!/bin/bash
# scripts/backup-reference-data.sh

# Daily backup of reference data
BACKUP_DIR="/backups/reference-data"
DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="$BACKUP_DIR/reference-data-$DATE.sql"

mkdir -p $BACKUP_DIR

# Use CrunchyBridge connection string
pg_dump $DATABASE_URL \
--verbose \
--no-owner \
--no-privileges \
--table="original_sources*" \
--table="country_iso*" \
--table="fao_major_areas*" \
--table="gear_types_*" \
--table="vessel_*" \
--table="rfmos*" \
--table="historical_changes*" \
> $BACKUP_FILE

# Compress backup
gzip $BACKUP_FILE

# Keep only last 30 days of backups
find $BACKUP_DIR -name "*.sql.gz" -mtime +30 -delete

echo "✅ Backup completed: $BACKUP_FILE.gz"

🚀 Deployment Checklist

Pre-Migration

  • CrunchyBridge cluster created and configured
  • Environment variables set up
  • Local data export completed and verified
  • Schema migrations generated and reviewed

Migration Day

  • Apply schema migrations to CrunchyBridge
  • Import reference data in correct order
  • Run verification scripts
  • Set up historical change tracking
  • Configure monitoring and health checks

Post-Migration

  • Update application connection strings
  • Test all reference data queries
  • Verify foreign key relationships
  • Set up automated backups
  • Configure alerts for failed sources
  • Document rollback procedures

Ongoing Maintenance

  • Daily health checks
  • Weekly source status reviews
  • Monthly historical data cleanup
  • Quarterly performance analysis

🔄 Next Steps for Historical Change Tracking

Tomorrow's implementation should focus on:

  1. Deploy historical tracking schema to CrunchyBridge
  2. Set up trigger functions for change capture
  3. Implement change tracking service with TypeScript
  4. Create historical data API endpoints for querying changes
  5. Test change tracking with sample data updates

This foundation provides a robust, scalable reference data system ready for production use with full historical tracking capabilities! 🎉